{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Running Dask on the cluster with mlrun"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The dask frameworks enables users to parallelize their python code and run it as a distributed process on Iguazio cluster and dramatically accelerate their performance. <br>\n",
    "In this notebook you'll learn how to create a dask cluster and then an mlrun function running as a dask client. <br>\n",
    "It also demonstrates how to run parallelize custom algorithm using Dask Delayed option\n",
    "\n",
    "For more information on dask over kubernetes: https://kubernetes.dask.org/en/latest/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Basic configuration"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Import mlrun and dask. nuclio is used just to convert the code into an mlrun function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Make sure thar mlrun is installed. if it's already installed then skip this step\n",
    "#to instlal mlrun run the following\n",
    "\n",
    "!/User/align_mlrun.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "!mkdir -p /User/examples/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
      "                                 Dload  Upload   Total   Spent    Left  Speed\n",
      "100 84.9M  100 84.9M    0     0  21.8M      0  0:00:03  0:00:03 --:--:-- 21.8M\n"
     ]
    }
   ],
   "source": [
    "%%sh\n",
    "CSV_PATH=\"/User/examples/ytrip.csv\"\n",
    "curl -L \"https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv\" > ${CSV_PATH}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# nuclio: ignore\n",
    "import nuclio "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "%nuclio: setting kind to 'job'\n",
      "%nuclio: setting spec.image to 'mlrun/ml-models'\n"
     ]
    }
   ],
   "source": [
    "# nuclio: start-code\n",
    "%nuclio config kind = \"job\"\n",
    "%nuclio config spec.image = \"mlrun/ml-models\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mlrun.execution import MLClientCtx\n",
    "from mlrun.datastore import DataItem\n",
    "\n",
    "from dask.distributed import Client\n",
    "from dask import delayed\n",
    "from dask import dataframe as dd\n",
    "\n",
    "import warnings\n",
    "import numpy as np\n",
    "import os\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a python function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This simple function reads a csv file using dask dataframe and run group by and describe function on the dataset <br>\n",
    "It also shows how to use the dask delayed function to run a python API that is not natively supported by Dask and leverage dask to run it as a distributed process . <br>\n",
    "In this case we run numpy asmatrix which Interpret the input as a matrix. Using Dask Delayed it runs it in parallel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_dask(context: MLClientCtx,\n",
    "              dataset: DataItem,\n",
    "              dask_client: str=None) -> None:\n",
    "    \n",
    "    if dask_client:\n",
    "        client = Client(dask_client)\n",
    "    else:\n",
    "        client = Client()\n",
    "        \n",
    "    df = dataset.as_df(df_module=dd)\n",
    "    df_describe = df.describe().compute()\n",
    "    df_grpby = df.groupby(\"VendorID\").count().compute()\n",
    "    df_matrix = delayed(np.asmatrix)(df).compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# nuclio: end-code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set up the enviroment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "import mlrun\n",
    "artifact_path = mlrun.set_environment(api_path = mlrun.mlconf.dbpath or 'http://mlrun-api:8080',\n",
    "                                      artifact_path = os.path.abspath('./'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convert the code to MLrun function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use code_to_function to convert the code to MLrun and specify the configuration for the dask process (e.g. replicas, memory etc) <br>\n",
    "Note that the resource configurations are per worker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "fn = mlrun.code_to_function(\"test_dask\",  kind='job', handler=\"test_dask\").apply(mlrun.mount_v3io())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Init dask cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2020-12-20 09:58:46,894 [info] using in-cluster config.\n"
     ]
    }
   ],
   "source": [
    "dsf = mlrun.new_function(\"dask_init\", kind='dask', image='mlrun/ml-models').apply(mlrun.mount_v3io())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "dsf.spec.remote = True\n",
    "dsf.spec.replicas = 2\n",
    "dsf.spec.max_replicas = 4\n",
    "dsf.spec.service_type = \"NodePort\"\n",
    "dsf.with_requests(mem='2G', cpu='2')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2020-12-20 09:58:46,980 [info] trying dask client at: tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786\n",
      "> 2020-12-20 09:58:47,000 [info] using remote dask scheduler (mlrun-dask-init-9d8122b2-b) at: tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.product-new.iguazio-cd2.com:31433/status\" target=\"_blank\" >dashboard link: default-tenant.app.product-new.iguazio-cd2.com:31433</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Client</h3>\n",
       "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
       "  <li><b>Scheduler: </b>tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786</li>\n",
       "  <li><b>Dashboard: </b><a href='http://mlrun-dask-init-9d8122b2-b.default-tenant:8787/status' target='_blank'>http://mlrun-dask-init-9d8122b2-b.default-tenant:8787/status</a></li>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Cluster</h3>\n",
       "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
       "  <li><b>Workers: </b>1</li>\n",
       "  <li><b>Cores: </b>1</li>\n",
       "  <li><b>Memory: </b>4.15 GB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: 'tcp://10.200.0.53:8786' processes=1 threads=1, memory=4.15 GB>"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dsf.client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Replace the Dask_Clinet with the client scheduler address (see above)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "DATA_URL = '/User/examples/ytrip.csv'\n",
    "DASK_CLIENT = '<scheduler address - see above>'\n",
    "# e.g. DASK_CLIENT = 'tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run the function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When running the function you would see a link as part of the result. click on this link takes you to the dask monitoring dashboard"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2020-12-20 09:59:03,605 [info] starting run test-dask-test_dask uid=7e86bbe9112a486b84e9aa586638fd66 DB=http://mlrun-api:8080\n",
      "> 2020-12-20 09:59:03,761 [info] Job is running in the background, pod: test-dask-test-dask-8ws4f\n",
      "> 2020-12-20 09:59:20,372 [info] run executed, status=completed\n",
      "final state: completed\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<style>\n",
       ".dictlist {\n",
       "  background-color: #4EC64B;\n",
       "  text-align: center;\n",
       "  margin: 4px;\n",
       "  border-radius: 3px; padding: 0px 3px 1px 3px; display: inline-block;}\n",
       ".artifact {\n",
       "  cursor: pointer;\n",
       "  background-color: #4EC64B;\n",
       "  text-align: left;\n",
       "  margin: 4px; border-radius: 3px; padding: 0px 3px 1px 3px; display: inline-block;\n",
       "}\n",
       "div.block.hidden {\n",
       "  display: none;\n",
       "}\n",
       ".clickable {\n",
       "  cursor: pointer;\n",
       "}\n",
       ".ellipsis {\n",
       "  display: inline-block;\n",
       "  max-width: 60px;\n",
       "  white-space: nowrap;\n",
       "  overflow: hidden;\n",
       "  text-overflow: ellipsis;\n",
       "}\n",
       ".master-wrapper {\n",
       "  display: flex;\n",
       "  flex-flow: row nowrap;\n",
       "  justify-content: flex-start;\n",
       "  align-items: stretch;\n",
       "}\n",
       ".master-tbl {\n",
       "  flex: 3\n",
       "}\n",
       ".master-wrapper > div {\n",
       "  margin: 4px;\n",
       "  padding: 10px;\n",
       "}\n",
       "iframe.fileview {\n",
       "  border: 0 none;\n",
       "  height: 100%;\n",
       "  width: 100%;\n",
       "  white-space: pre-wrap;\n",
       "}\n",
       ".pane-header-title {\n",
       "  width: 80%;\n",
       "  font-weight: 500;\n",
       "}\n",
       ".pane-header {\n",
       "  line-height: 1;\n",
       "  background-color: #4EC64B;\n",
       "  padding: 3px;\n",
       "}\n",
       ".pane-header .close {\n",
       "  font-size: 20px;\n",
       "  font-weight: 700;\n",
       "  float: right;\n",
       "  margin-top: -5px;\n",
       "}\n",
       ".master-wrapper .right-pane {\n",
       "  border: 1px inset silver;\n",
       "  width: 40%;\n",
       "  min-height: 300px;\n",
       "  flex: 3\n",
       "  min-width: 500px;\n",
       "}\n",
       ".master-wrapper * {\n",
       "  box-sizing: border-box;\n",
       "}\n",
       "</style><script>\n",
       "function copyToClipboard(fld) {\n",
       "    if (document.queryCommandSupported && document.queryCommandSupported('copy')) {\n",
       "        var textarea = document.createElement('textarea');\n",
       "        textarea.textContent = fld.innerHTML;\n",
       "        textarea.style.position = 'fixed';\n",
       "        document.body.appendChild(textarea);\n",
       "        textarea.select();\n",
       "\n",
       "        try {\n",
       "            return document.execCommand('copy'); // Security exception may be thrown by some browsers.\n",
       "        } catch (ex) {\n",
       "\n",
       "        } finally {\n",
       "            document.body.removeChild(textarea);\n",
       "        }\n",
       "    }\n",
       "}\n",
       "function expandPanel(el) {\n",
       "  const panelName = \"#\" + el.getAttribute('paneName');\n",
       "  console.log(el.title);\n",
       "\n",
       "  document.querySelector(panelName + \"-title\").innerHTML = el.title\n",
       "  iframe = document.querySelector(panelName + \"-body\");\n",
       "\n",
       "  const tblcss = `<style> body { font-family: Arial, Helvetica, sans-serif;}\n",
       "    #csv { margin-bottom: 15px; }\n",
       "    #csv table { border-collapse: collapse;}\n",
       "    #csv table td { padding: 4px 8px; border: 1px solid silver;} </style>`;\n",
       "\n",
       "  function csvToHtmlTable(str) {\n",
       "    return '<div id=\"csv\"><table><tr><td>' +  str.replace(/[\\n\\r]+$/g, '').replace(/[\\n\\r]+/g, '</td></tr><tr><td>')\n",
       "      .replace(/,/g, '</td><td>') + '</td></tr></table></div>';\n",
       "  }\n",
       "\n",
       "  function reqListener () {\n",
       "    if (el.title.endsWith(\".csv\")) {\n",
       "      iframe.setAttribute(\"srcdoc\", tblcss + csvToHtmlTable(this.responseText));\n",
       "    } else {\n",
       "      iframe.setAttribute(\"srcdoc\", this.responseText);\n",
       "    }\n",
       "    console.log(this.responseText);\n",
       "  }\n",
       "\n",
       "  const oReq = new XMLHttpRequest();\n",
       "  oReq.addEventListener(\"load\", reqListener);\n",
       "  oReq.open(\"GET\", el.title);\n",
       "  oReq.send();\n",
       "\n",
       "\n",
       "  //iframe.src = el.title;\n",
       "  const resultPane = document.querySelector(panelName + \"-pane\");\n",
       "  if (resultPane.classList.contains(\"hidden\")) {\n",
       "    resultPane.classList.remove(\"hidden\");\n",
       "  }\n",
       "}\n",
       "function closePanel(el) {\n",
       "  const panelName = \"#\" + el.getAttribute('paneName')\n",
       "  const resultPane = document.querySelector(panelName + \"-pane\");\n",
       "  if (!resultPane.classList.contains(\"hidden\")) {\n",
       "    resultPane.classList.add(\"hidden\");\n",
       "  }\n",
       "}\n",
       "\n",
       "</script>\n",
       "<div class=\"master-wrapper\">\n",
       "  <div class=\"block master-tbl\"><div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th>project</th>\n",
       "      <th>uid</th>\n",
       "      <th>iter</th>\n",
       "      <th>start</th>\n",
       "      <th>state</th>\n",
       "      <th>name</th>\n",
       "      <th>labels</th>\n",
       "      <th>inputs</th>\n",
       "      <th>parameters</th>\n",
       "      <th>results</th>\n",
       "      <th>artifacts</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <td>default</td>\n",
       "      <td><div title=\"7e86bbe9112a486b84e9aa586638fd66\"><a href=\"https://mlrun-ui.default-tenant.app.product-new.iguazio-cd2.com/projects/default/jobs/monitor/7e86bbe9112a486b84e9aa586638fd66/info\" target=\"_blank\" >...6638fd66</a></div></td>\n",
       "      <td>0</td>\n",
       "      <td>Dec 20 09:59:09</td>\n",
       "      <td>completed</td>\n",
       "      <td>test-dask-test_dask</td>\n",
       "      <td><div class=\"dictlist\">v3io_user=admin</div><div class=\"dictlist\">kind=job</div><div class=\"dictlist\">owner=admin</div><div class=\"dictlist\">host=test-dask-test-dask-8ws4f</div></td>\n",
       "      <td><div class=\"artifact\" onclick=\"expandPanel(this)\" paneName=\"resultf521b185\" title=\"/files/examples/ytrip.csv\">dataset</div></td>\n",
       "      <td><div class=\"dictlist\">dask_client=tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786</div></td>\n",
       "      <td></td>\n",
       "      <td></td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div></div>\n",
       "  <div id=\"resultf521b185-pane\" class=\"right-pane block hidden\">\n",
       "    <div class=\"pane-header\">\n",
       "      <span id=\"resultf521b185-title\" class=\"pane-header-title\">Title</span>\n",
       "      <span onclick=\"closePanel(this)\" paneName=\"resultf521b185\" class=\"close clickable\">&times;</span>\n",
       "    </div>\n",
       "    <iframe class=\"fileview\" id=\"resultf521b185-body\"></iframe>\n",
       "  </div>\n",
       "</div>\n"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "to track results use .show() or .logs() or in CLI: \n",
      "!mlrun get run 7e86bbe9112a486b84e9aa586638fd66 --project default , !mlrun logs 7e86bbe9112a486b84e9aa586638fd66 --project default\n",
      "> 2020-12-20 09:59:22,969 [info] run executed, status=completed\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<mlrun.model.RunObject at 0x7fb52a0e8550>"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn.run(handler = test_dask,\n",
    "       inputs={\"dataset\": DATA_URL},\n",
    "       params={\"dask_client\": DASK_CLIENT})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Track the progress in the UI"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Users can view the progress and detailed information in the mlrun UI by clicking on the uid above. <br>\n",
    "Also, to track the dask progress in the dask UI click on the \"dashboard link\" above the \"client\" section"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python [conda env:root] *",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
